#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <stdint.h>
#include <time.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <pthread.h>
#include <signal.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/queue.h>
#include <assert.h>
#include <stdbool.h>
#include <sys/syscall.h>
#include <sys/socket.h>
#include <mtcp_api.h>
#include <mtcp_epoll.h>
#include "cpu.h"
#include "rss.h"
#include "http_parsing.h"
#include "debug.h"
#include "../definition.h"
#include "pm.h"



/*----------------------------------------------------------------------------*/
thread_context_t 
CreateContext(int core)
{
	thread_context_t ctx;

	ctx = (thread_context_t)calloc(1, sizeof(struct thread_context));
	if (!ctx) {
		perror("malloc");
		TRACE_ERROR("Failed to allocate memory for thread context.\n");
		return NULL;
	}
	ctx->core = core;

	ctx->mctx = mtcp_create_context(core);
	if (!ctx->mctx) {
		TRACE_ERROR("Failed to create mtcp context.\n");
		return NULL;
	}
	g_mctx[core] = ctx->mctx;

	return ctx;
}
/*----------------------------------------------------------------------------*/
void 
DestroyContext(thread_context_t ctx) 
{
	mtcp_destroy_context(ctx->mctx);
	free(ctx);
}



int gettid() {
    pid_t tid = syscall(SYS_gettid);
    return tid;
}

/*----------------------------------------------------------------------------*/

void gen_random(const char *src, char *dst, int len, char *prefix) {
    size_t tmp_len = (size_t)len;
    size_t prefix_len = 0;
    if (prefix != NULL) {
        prefix_len = strlen(prefix);
        tmp_len -= prefix_len;
        strncpy(dst, prefix, strlen(prefix));
    }
    int i=0;
    for (i=0; i<tmp_len; i++) {
        dst[prefix_len+i] = src[rand()%(sizeof(src)-1)];
    }
}

void gen_mono_increase(char *dst, int len, char *prefix, long number) {
    size_t tmp_len = (size_t)len;
    size_t prefix_len = 0;
	char padding[1024], buf[1024];
    if (prefix != NULL) {
        prefix_len = strlen(prefix);
        tmp_len -= prefix_len;
        strncpy(dst, prefix, prefix_len);
    }
    int n = snprintf(NULL, 0, "%ld", number);
    assert(n > 0);
    size_t pad_len = tmp_len - n;
	memset(padding, '0', 1024);
    strncpy(dst+prefix_len, padding, pad_len);
    int c = snprintf(buf, n+1, "%ld", number); //use n+1 instead of n
    assert(buf[n] == '\0');
    assert(c == n);
    strncpy(dst+prefix_len+pad_len, buf, c+1);
//	fprintf(stderr, "len prefix=%lu pad_len=%lu number=%ld =%d/%d\n", prefix_len, pad_len, number, n, c);
}


void init_k_v_pair_list(int num_keys, char *prefix) {
    k_v_pair_list = (k_v_pair **)malloc(num_keys * sizeof(k_v_pair *));
    long global_key_seq_no = 1;
        int i=0;
    for ( i=0; i<num_keys; i++) {
        k_v_pair_list[i] = (k_v_pair *)malloc(sizeof(k_v_pair));
        /*
        //pre-fill
        memset(k_v_pair_list[i]->key, 'a', k_length);
        memset(k_v_pair_list[i]->value, 'b', v_length);

        size_t kl = snprintf(NULL, 0, "%s%d", prefix, i);
        sprintf(k_v_pair_list[i]->key, "%s%d", prefix, i);
        sprintf(k_v_pair_list[i]->key + kl, "testkey");

        size_t vl = snprintf(NULL, 0, "value%d", i);
        sprintf(k_v_pair_list[i]->value, "test%dvalue%d", i, i);
         */

        if (gen_fix_keys) {
            gen_mono_increase(k_v_pair_list[i]->key, k_length, prefix, global_key_seq_no);
            gen_mono_increase(k_v_pair_list[i]->value, v_length, NULL, global_key_seq_no);
            global_key_seq_no++;
        } else {
            gen_random(key_text, k_v_pair_list[i]->key, k_length, prefix);
            gen_random(value_text, k_v_pair_list[i]->value, v_length, NULL);
        }
        if (ENABLE_DBG) {
//			fprintf(stderr, "init [%d] k=%s v=%s\n", i, k_v_pair_list[i]->key,
//					k_v_pair_list[i]->value);
        }
    }
//	fprintf(stderr, "finish init k_v pair list k_length=%d v_length=%d pairs=%d\n",
//			k_length, v_length, num_keys);
}

void release_k_v_pair_list(int num_keys) {
    if (NULL == k_v_pair_list)
        exit(EXIT_FAILURE);
            int i=0;
    for ( i=0; i<num_keys; i++) {
        if (NULL != k_v_pair_list[i])
            free(k_v_pair_list[i]);
    }
    free(k_v_pair_list);
}

static __inline__ unsigned long long rdtsc(void)
{
  unsigned hi, lo;
  __asm__ __volatile__ ("rdtsc" : "=a"(lo), "=d"(hi));
  return ( (unsigned long long)lo)|( ((unsigned long long)hi)<<32 );
}

char *get_value_by_key(char *key, int k_len) {
        int i=0;
    for (i=0; i<num_keys; i++) {
        if (0 == strncmp(key, k_v_pair_list[i]->key, k_len)) {
            return k_v_pair_list[i]->value;
        }
    }
    return NULL;
}


/*----------------------------------------------------------------------------*/
#ifdef DBGMSG
uint32_t pre_times = 0;
time_t print_interval = 0;
#endif
static inline int 
SendHTTPRequest(thread_context_t ctx, int sockid, struct wget_vars *wv)
{
	struct mtcp_epoll_event ev;
    int realfd = ctx->wvars[sockid].fd;
	int wr;
	int i;
    int key_idx;

    wv->headerset = FALSE;
	wv->recv = 0;
    wv->header_len = wv->file_len = 0;

	gettimeofday(&time_current, NULL);
	time_interval = time_current.tv_sec - time_start.tv_sec;

	if(time_interval < sum_iter){
      //if(wv->tmpfd == 0){
		wv->len_out = dt_len - wv->len_send;
		while (wv->len_out > 0) {

            key_idx = rand() % num_keys;
            if (set_opt)
                compose_binary_set(wv->kv_buf, k_v_pair_list[key_idx]->key,
                                   k_v_pair_list[key_idx]->value,
                                   CMD_SET, 0, 0);
            else if(!wv->resend)
                compose_binary_get(wv->kv_buf, k_v_pair_list[key_idx]->key,
                                   CMD_GETK, 0, 0);
            else
                wv->resend = 0;
            wr = mtcp_write(ctx->mctx, realfd, wv->kv_buf, wv->len_out);
			if (wr > 0) {
				ctx->stat.writes += wr;
				wv->len_out -= wr;
				wv->len_send += wr;
			}
			else if (errno == EAGAIN) {
                TRACE_ERROR("Socket %d: Sending HTTP not finished.\n", realfd);
				break;
			}
			else {
                printf("Socket %d: remote close.\n", realfd);
                printf("1\n");
                CloseConnection(ctx, realfd);
				ctx->stat.errors++;
				break;
			}
		}
		wv->write_times++;
#ifdef DBGMSG
		if(time_interval - print_interval >= 1){
			printf("[%d] Write times: %d\n", ctx->core,  wv->write_times);
			pre_times = wv->write_times;
			print_interval = time_interval;
		}
#endif

      }
	else {
        printf("2\n");
        CloseConnection(ctx, realfd);
		ctx->stat.completes++;
	}

    if (wv->len_send == dt_len) {

		ev.events = MTCP_EPOLLIN | MTCP_EPOLLET;
        ev.data.sockid = realfd;
        mtcp_epoll_ctl(ctx->mctx, ctx->ep, MTCP_EPOLL_CTL_MOD, realfd, &ev);
		wv->len_send = 0;
	}

	return 0;
}

void assign_bin (unsigned long time, int core)
{
    int i = 0;
    for(i=0; i < SLOT_NUM - 2; i++)
    {
        if(time >= time_bin[i] && time <= time_bin[i+1])
        {
            time_slot[core][i]++;
//printf("%d %d\n",time_bin[i], time_bin[i+1]);
            return ;
        }
    }
    if(time > time_bin[SLOT_NUM - 1])
        time_slot[core][SLOT_NUM - 1]++;
}

void process_response(thread_context_t ctx, char *buf, int len, int core, struct wget_vars *wv) {

        binary_header_t *h;
        uint16_t k_len;
        uint32_t body_len;
        uint8_t ext_len;
        unsigned long time_interval;
        int v_len;
        char *exp_get_value;
        int op_type;
	if(set_opt)
		assert(len>=24);
	else
        	assert(len >=36);

        h = (binary_header_t *)(buf);
        k_len = ntohs(h->key_len);
        body_len = ntohl(h->body_len);
        ext_len = h->extra_len;
        v_len = body_len - k_len - ext_len;
        op_type = h->opcode;
/*int tmpi;
for (tmpi = 0; tmpi < 24; tmpi++) printf("[%x]", (unsigned int)buf[tmpi]);printf("\n");*/
/*int ii=0;
	for (ii = 0; ii <len ; ++ii) {
                    fprintf(stderr, " 0x%02x", buf[ii]);
                }
                fprintf(stderr, "\n");*/
        if (CMD_GETK == op_type) {
            //assert(ext_len == 4 && v_len > 0);
                gettimeofday(&wv->pc_time, NULL);
                time_interval = US_PER_SECOND * (wv->pc_time.tv_sec - wv->pp_time.tv_sec) + wv->pc_time.tv_usec - wv->pp_time.tv_usec;

                wv->pp_time = wv->pc_time;
                assign_bin(time_interval, core);
				thread_count[core] ++;
        }
        else if (op_type == CMD_SET) {
	//	if(ext_len!=0 || v_len!=0)
//printf("exlen %d,vlen %d\n",ext_len,v_len);
            assert(ext_len == 0 && v_len == 0);
            thread_count[core]++;
            gettimeofday(&wv->pc_time, NULL);
            time_interval  = US_PER_SECOND * (wv->pc_time.tv_sec - wv->pp_time.tv_sec) + wv->pc_time.tv_usec - wv->pp_time.tv_usec;

            wv->pp_time = wv->pc_time;
            assign_bin(time_interval, core);
        }
 }



/*----------------------------------------------------------------------------*/
static inline int
HandleReadEvent(thread_context_t ctx, int sockid, struct wget_vars *wv, int core)
{
	mctx_t mctx = ctx->mctx;
    int realfd = ctx->wvars[sockid].fd;
	struct mtcp_epoll_event ev;
	char buf[BUF_SIZE];
	char *pbuf;
	int rd, copy_len;
	rd = 1;
	if(wv->read_times <= wv->write_times){
		while (rd > 0) {
        rd = mtcp_read(mctx, realfd, buf, BUF_SIZE);

			if (rd <= 0)
				break;
            process_response(ctx,  buf,rd, core, wv);
			ctx->stat.reads += rd;	
			wv->recv += rd;
            TRACE_APP("Socket %d: mtcp_read ret: %d, total_recv: %lu, "
                    "header_set: %d, header_len: %u, file_len: %lu\n",
                    realfd, rd, wv->recv + rd,
                    wv->headerset, wv->header_len, wv->file_len);
        }
		wv->read_times++;
		if (rd == 0) {
            TRACE_INFO("Socket %d connection closed with server.\n", realfd);
            printf("3\n");
            CloseConnection(ctx, realfd);
	
		} else if (rd < 0) {
			if (errno == EAGAIN){
				if(time_interval > sum_iter){
                    printf("4\n");
                    CloseConnection(ctx, realfd);
					ctx->stat.completes++;
				}
			}
			else {
				TRACE_INFO("Socket %d: mtcp_read() error %s\n", 
                        realfd, strerror(errno));
				ctx->stat.errors++;
				ctx->errors++;
                printf("5\n");
                CloseConnection(ctx, realfd);
			}
		}
#ifdef EPOLL_SEND
		ev.events = MTCP_EPOLLOUT | MTCP_EPOLLET;
        ev.data.sockid = realfd;
        mtcp_epoll_ctl(ctx->mctx, ctx->ep, MTCP_EPOLL_CTL_MOD, realfd, &ev);
#else
        if(wv->tmpfd == 0)
            SendHTTPRequest(ctx, sockid, wv);
#endif
		
	}
	
	return 0;
}

void dump_buffer_content(const char *context, char *buf, int len, int mode) {
    fprintf(stderr, "\n< %s dump start  ", context);
        int i=0;
    for ( i=0; i<len; i++) {
        if (0==i%4)
            fprintf(stderr, "\n dump > ");
        if (1 == mode) { //ascii mode
            fprintf(stderr, "0x%02x ", buf[i]);
        } else { //char mode
            fprintf(stderr, "%c ", buf[i]);
        }
    }
    fprintf(stderr, "  dump end >\n\n");
}

int compose_binary_get(char *request, char *key, int cmd, int ip_rank, int port) {
    uint16_t key_len = (uint16_t)strlen(key);
    //uint16_t _req_ip_map_addr = (uint16_t) (46543 + rank);
    //uint16_t _req_port = (uint16_t) (32242 + rank * 5);
    uint8_t hip = (uint8_t)(0x00ff & ip_rank);
    binary_get_req_header h = {0x80, cmd, htons(key_len),
                               0x00, 0x00, {htons(0)},
                               htonl(key_len),
                               htons(ip_rank), htons(port),
                               0x0000000000000000};
    if(header_op == 1)
    {
        TCPheader tcph = {0x61, hip, htons(port), htons(1), htons((uint32_t)(key_len + sizeof(h) + sizeof(TCPheader))), htonl(0)};
        TCPheader innh = {0x60, hip, htons(port), htons(1), htons((uint32_t)(key_len + sizeof(h))), htonl(0)};
        memcpy(request, &tcph, sizeof(tcph));
        memcpy(request + sizeof(tcph), &innh, sizeof(innh));
        memcpy(request + 2 * sizeof(TCPheader), &h, sizeof(h));
        memcpy(request + 2 * sizeof(TCPheader) + sizeof(h), key, key_len);
        //  fprintf(stderr, "finish compose binary_get op=0x%02x sz=%d ip=%u port=%u\n",
        //          cmd, 24+key_len, ip_rank, port);
        return (48+key_len);
    }
    else
    {
        memcpy(request, &h, sizeof(h));
        memcpy(request + sizeof(h), key, key_len);
        //  fprintf(stderr, "finish compose binary_get op=0x%02x sz=%d ip=%u port=%u\n",
        //          cmd, 24+key_len, ip_rank, port);
        return (24+key_len);
    }
}

int compose_binary_set(char *request, char *key, char *value,
                       int cmd, int ip_rank, int port) {
    size_t k_len = strlen(key);
    size_t v_len = strlen(value);
    uint64_t cas = 0;
    binary_set_req_header header = { 0x80, CMD_SET, htons(k_len),
                                     0x08, 0x00, {htons(0)},
                                     htonl((uint32_t)(k_len + 8 + v_len)),
                                     {htons(ip_rank), htons(port)},
                                     0x0000000000000000, 0x0000000000000000
                                   };
    if(header_op == 1)
    {
        TCPheader tcph = {0x61, 0x00, htons(0), htons(1), htons((uint32_t)(k_len + v_len + sizeof(header) + sizeof(TCPheader))), htonl(0)};
        TCPheader innh = {0x60, 0x00, htons(0), htons(1), htons((uint32_t)(k_len + sizeof(header) + v_len)), htonl(0)};
        memcpy(request , &tcph , sizeof(tcph));
        memcpy(request + sizeof(tcph), &innh, sizeof(innh));
        memcpy(request + 2 * sizeof(TCPheader), &header, sizeof(header));
        memcpy(request + 2 * sizeof(TCPheader) + sizeof(header), key, k_len);
        memcpy(request + 2 * sizeof(TCPheader) + sizeof(header) + k_len, value, v_len);
    }
    else
    {
        memcpy(request, &header, 32);
        memcpy(request + 32, key, k_len);
        memcpy(request + 32 + k_len, value, v_len);
        //    fprintf(stderr, "finish create_binary_set_request k=%s (%lu) v=%s (%lu) buf_len=%zu\n",
        //            key, k_len, value, v_len, strlen(request));

    }
     //*body_len = k_len + v_len;
}


/*----------------------------------------------------------------------------*/
void *
RunWgetMain(void *arg)
{
	thread_context_t ctx;
    char random_state[16];
	mctx_t mctx;
	int core = *(int *)arg;
	struct in_addr daddr_in;
	int n, maxevents;
	int ep;
	struct mtcp_epoll_event *events;
	int nevents;
	struct wget_vars *wvars;
	int i;
	//int max_fds = 1000;
	struct timeval cur_tv, prev_tv;
	uint64_t cur_ts, prev_ts;


    initstate(seed, random_state, sizeof(random_state));


    //create_sample_fixpacket(1, request, dt_len);


	mtcp_core_affinitize(core);

	ctx = CreateContext(core);
	if (!ctx) {
		return NULL;
	}
	mctx = ctx->mctx;
	g_ctx[core] = ctx;
	g_stat[core] = &ctx->stat;


    //mtcp_init_rss(mctx, saddr, IP_RANGE, daddr, dport);
   // mtcp_init_rss(mctx, saddr, IP_RANGE, inet_addr(SERV_IP), htons(SERV_PORT));

	daddr_in.s_addr = daddr;
	//fprintf(stderr, "Thread %d handles %d flows. connecting to %s:%u\n", 
	//		core, n, inet_ntoa(daddr_in), ntohs(dport));

	/* Initialization */
	maxevents = max_fds * 3;
	ep = mtcp_epoll_create(mctx, maxevents);
	if (ep < 0) {
		TRACE_ERROR("Failed to create epoll struct!n");
		exit(EXIT_FAILURE);
	}
	events = (struct mtcp_epoll_event *)
			calloc(maxevents, sizeof(struct mtcp_epoll_event));
	if (!events) {
		TRACE_ERROR("Failed to allocate events!\n");
		exit(EXIT_FAILURE);
	}
	ctx->ep = ep;
	
	wvars = (struct wget_vars *)calloc(max_fds, sizeof(struct wget_vars));
	if (!wvars) {
		TRACE_ERROR("Failed to create wget variables!\n");
		exit(EXIT_FAILURE);
	}
	ctx->wvars = wvars;
    ctx->started = ctx->done = ctx->pending = 0;
	ctx->errors = ctx->incompletes = 0;
    memset(ctx->realfd_to_fakefd, 0, 65536);

    struct sockaddr_in addr;

    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = daddr;
    addr.sin_port = dport;

	while (ctx->pending < concurrency) {
        if (pm_createConnection(ctx,  (struct sockaddr *)&addr,  sizeof(struct sockaddr_in)) < 0) {
			done[core] = TRUE;
			break;
		}
	}

	//gettimeofday(&time_current, NULL);
	gettimeofday(&time_start, NULL);
	
	while (!done[core]) {

        nevents = pm_epoll_wait(ctx, ep, events, maxevents, -1);
		ctx->stat.waits++;
		if (ctx->stat.waits == 1) {
			gettimeofday(&time_start, NULL);
		}
		//printf("nevents: %d\n", nevents);	
		if (nevents < 0) {
			if (errno != EINTR) {
                TRACE_ERROR("pm_epoll_wait failed! ret: %d\n", nevents);
			}
			done[core] = TRUE;
			break;
		} else {
			ctx->stat.events += nevents;
		}

		for (i = 0; i < nevents; i++) {
            int realfd = ctx->wvars[events[i].data.sockid].fd;
			if (events[i].events & MTCP_EPOLLERR) {
				int err;
				socklen_t len = sizeof(err);

				TRACE_APP("[CPU %d] Error on socket %d\n", 
                        core, realfd);
				ctx->stat.errors++;
				ctx->errors++;
                if (mtcp_getsockopt(mctx, realfd,
							SOL_SOCKET, SO_ERROR, (void *)&err, &len) == 0) {
					if (err == ETIMEDOUT)
						ctx->stat.timedout++;
				}
                printf("6\n");
                CloseConnection(ctx, realfd);

            } else if (events[i].events & MTCP_EPOLLIN) {
				TRACE_APP("Event: %d, IN, read: %d\n", i,
                    wvars[events[i].data.sockid].read_times);

                HandleReadEvent(ctx,
                        realfd, &wvars[events[i].data.sockid],core);


            } else if (events[i].events & MTCP_EPOLLOUT) {
				TRACE_APP("Event: %d, OUT, write: %d\n", i,
					wvars[events[i].data.sockid].write_times);
				struct wget_vars *wv = &wvars[events[i].data.sockid];
				SendHTTPRequest(ctx, events[i].data.sockid, wv);
			} else {
				TRACE_ERROR("Socket %d: event: %s\n", 
                        realfd, EventToString(events[i].events));
				assert(0);
			}
		}

		// if (ctx->done >= ctx->target) {
		// 	fprintf(stdout, "[CPU %d] Completed %d connections, "
		// 			"errors: %d incompletes: %d\n", 
		// 			ctx->core, ctx->done, ctx->errors, ctx->incompletes);
		// 	break;
		// }
	}
	
	fprintf(stdout, "[CPU %d] Completed %d connections, "
		"errors: %d incompletes: %d\n", 
		ctx->core, ctx->done, ctx->errors, ctx->incompletes);

	TRACE_INFO("Wget thread %d waiting for mtcp to be destroyed.\n", core);
	DestroyContext(ctx);

	TRACE_DBG("Wget thread %d finished.\n", core);
	pthread_exit(NULL);
	return NULL;
}
/*----------------------------------------------------------------------------*/
void 
SignalHandler(int signum)
{
	int i;

	for (i = 0; i < core_limit; i++) {
		done[i] = TRUE;
	}
}
/*----------------------------------------------------------------------------*/



double calculate_latency(int x0, int x1, double y0, double y1)
{
    double percent = LATENCY_PERCENT;

    double latency = (double) x0 + (double)(x1 - x0) * (percent - y0) / (y1 - y0);
    return latency;
}

void *log_thread(void *args) {
    int core = *(int *)args;
    ssize_t ret;
    int times = 0;

    int i;
    char buf;
    double latency;
    double packet_process_latency;
    uint64_t total_cnt, sum, pre_sum;
    uint64_t pre_num[SLOT_NUM];
    double percent = LATENCY_PERCENT;
    while (1) {
        sleep(1); // wait for 1 second
        uint32_t avg_resp;
        sum = 0;

        gettimeofday(&total_cur_time[core], NULL);

        uint64_t timetime = US_PER_SECOND * (total_cur_time[core].tv_sec - total_pre_time[core].tv_sec) + total_cur_time[core].tv_usec - total_pre_time[core].tv_usec;

        total_cnt = thread_count[core] - prv_count[core];
        packet_process_speed = (double) total_cnt * US_PER_SECOND / timetime;
        printf("times: %d \t Thread: %d \trps: %.2f\n",times, core, packet_process_speed);



        avg_resp = (thread_count[core] - prv_count[core]) / concurrency;
        packet_process_latency = (double)timetime / avg_resp;

        printf("times: %d \t Thread: %d \tavg: %.2f\n", times, core, packet_process_latency); //average latency


       /* for(i = 0; i < SLOT_NUM; i++)
                  {
                      printf("bin[%d] :%lf \n",i, (double) (time_slot[core][i] - pre_num[i])/total_cnt );
                  }*/
        if(packet_process_speed > 1)
        {
            for(i = 0; i < SLOT_NUM - 1; i++)
            {
                sum += time_slot[core][i] - pre_num[i];
                pre_num[i] = time_slot[core][i];
                if(sum >= total_cnt * percent)
                    break;
                pre_sum = sum;
            }

            if(i != SLOT_NUM - 1 )
                per_core_latency[core] = calculate_latency(time_bin[i], time_bin[i+1],(double)pre_sum / total_cnt, (double)sum /total_cnt);
            else
                per_core_latency[core] = time_bin[SLOT_NUM -1];
        }
        else
            per_core_latency[core] = 0;


        printf("times: %d \t Thread: %d \t99%: %.2f\n", times++, core, per_core_latency[core]); //average latency
        for( ; i < SLOT_NUM; i++)
        {
           pre_num[i] = time_slot[core][i];
        }
        total_pre_time[core] = total_cur_time[core];

        prv_count[core] = thread_count[core];
    }
}

  /*  void *log_thread(void *args) {
        int core = *(int *)args;
        ssize_t ret;
        char buf;
        while (1) {
            if (thread_count[core] - prv_count[core] > print_size) {
                uint32_t avg_resp;

                gettimeofday(&total_cur_time[core], NULL);
                uint64_t timetime = US_PER_SECOND * (total_cur_time[core].tv_sec - total_pre_time[core].tv_sec) + total_cur_time[core].tv_usec - total_pre_time[core].tv_usec;

                packet_process_speed = (double)(thread_count[core] - prv_count[core]) *US_PER_SECOND / timetime;
                printf("[Throughput]\tThread: %d \trps: %.2f\n", core, packet_process_speed);


                avg_resp = (thread_count[core] - prv_count[core]) / concurrency;
                packet_process_latency = (double)timetime / avg_resp;
                per_core_latency[core] = packet_process_latency;
                printf("[Latency]\tThread: %d \tus: %.2f\n", core, packet_process_latency); //average latency

                if (2000000 > timetime ) {
                    print_size = print_size * 2;
                }
                else if (4000000 < timetime ) {
                    print_size= print_size / 2;
                }
                total_pre_time[core] = total_cur_time[core];

                prv_count[core] = thread_count[core];
            }
        }
}*/



int 
main(int argc, char **argv)
{
	int cores[MAX_CPUS];

	int ret;
	int i;
	struct mtcp_conf mcfg;


	if (argc != 11) {
		printf("Usage: ./tcp_cli ip_addr port_num  sum_iter concurrency cpu_cores gen_fix_keys num_keys k_length v_length set_opt\n");
		exit(-1);
	}
   char *key_prefix = "4";
	strcpy(host, argv[1]);

	daddr = inet_addr(host);
	dport = htons(atoi(argv[2]));
	saddr = INADDR_ANY;


	sum_iter = atoi(argv[3]);
	concurrency = atoi(argv[4]);
	core_limit = atoi(argv[5]);

	gen_fix_keys = atoi(argv[6]) ? true : false;
	num_keys = atoi(argv[7]);
	k_length = atoi(argv[8]);
	v_length = atoi(argv[9]);
	set_opt = atoi(argv[10]);
	if(set_opt)
		dt_len = sizeof(binary_set_req_header) + k_length + v_length;
	else
	dt_len = sizeof(binary_get_req_header) + k_length;
    init_k_v_pair_list(num_keys, key_prefix); //shared by all threads

	memset(trans_iter, 0, sizeof(trans_iter));

	num_cores = GetNumCPUs();
	core_limit = (core_limit > num_cores) ? num_cores : core_limit;
	mtcp_getconf(&mcfg);
	mcfg.num_cores = core_limit;
	mtcp_setconf(&mcfg);

	concurrency = concurrency / core_limit;
	max_fds = concurrency * 3;

	ret = mtcp_init("epserver.conf");
	if (ret) {
		TRACE_ERROR("Failed to initialize mtcp.\n");
		exit(EXIT_FAILURE);
	}

	mtcp_register_signal(SIGINT, SignalHandler);


	for (i = 0; i < core_limit; i++) {
		cores[i] = i;
		done[i] = FALSE;
		thread_count[i] = 0;
        prv_count[i] = 0;
		printf("creating thread\n");
		if (pthread_create(&app_thread[i], 
					NULL, RunWgetMain, (void *)&cores[i])) {
			printf("error in creating thread\n");
			perror("pthread_create");
			TRACE_ERROR("Failed to create wget thread.\n");
			exit(-1);
		}
        if (pthread_create(&pth[i], NULL, log_thread, (void *)&cores[i])) {
            printf("error in creating log_thread\n");
            perror("pthread_create");
            exit(-1);
        }

	}


	for (i = 0; i < core_limit; i++) {
		pthread_join(app_thread[i], NULL);
		TRACE_INFO("Wget thread %d joined.\n", i);
	}

	mtcp_destroy();
	return 0;
}


int find_fake_fd(thread_context_t ctx, int sockfd)
{
    int tmpi = 1 ;
    for(; tmpi< PM_FAKEFDS; tmpi++)
    {
        if (ctx->wvars[tmpi].use == 0) {
            memset(&ctx->wvars[tmpi], 0, sizeof(struct wget_vars));
            ctx->wvars[tmpi].fd = sockfd;
            ctx->wvars[tmpi].tmpfd = 0;
            ctx->wvars[tmpi].tmpindex = 1;
            ctx->wvars[tmpi].resend = 0;
            ctx->realfd_to_fakefd[sockfd] = tmpi; // so that realfd could find fake fd
            ctx->wvars[tmpi].use = 1;
            break;
        }
        if (tmpi == PM_FAKEFDS) return -1;
        else {

            if (tmpi > ctx->biggest_fd) ctx->biggest_fd = tmpi;
        }
    }
    return tmpi;
}

int reflesh_fake_fd(thread_context_t ctx, int fakefd)
{
    int tmpfd = ctx->wvars[fakefd].tmpfd;
    int tmpfakefd = ctx->realfd_to_fakefd[tmpfd];
    memcpy(&ctx->wvars[fakefd], &ctx->wvars[tmpfakefd], sizeof(struct wget_vars));
    memset(&ctx->wvars[tmpfakefd], 0, sizeof(struct wget_vars));
    int tmpi = ctx->biggest_fd;
    while(!ctx->wvars[tmpi--].fd);
    ctx->biggest_fd = tmpi++;

    return 0;
}


/*----------------------------------------------------------------------------*/
inline int
pm_createConnection(thread_context_t ctx, const struct sockaddr *addr, socklen_t addrlen)
{
    mctx_t mctx = ctx->mctx;
    struct mtcp_epoll_event ev;

    int sockid;
    int ret;
    int enable;

    sockid = mtcp_socket(mctx, AF_INET, SOCK_STREAM, 0);

    //printf("!@!@#!@$!$@!$!$!$!!!&%&(*!@&#(*%&!&#)(&%)!(&#%)&socket id:\t%d\n", sockid);
    if (sockid < 0) {
        TRACE_INFO("Failed to create socket!\n");
        return -1;
    }
    printf("CreateConnection: %d\n", sockid);
    int num = find_fake_fd(ctx, sockid);
    printf("the %dth socket %d\n", num,sockid);
    ret = mtcp_setsock_nonblock(mctx, sockid);
    if (ret < 0) {
        TRACE_ERROR("Failed to set socket in nonblocking mode.\n");
        exit(-1);
    }

    //Tao: set tcpnodelay
    //enable = 1;
    //mtcp_setsockopt(mctx, sockid, IPPROTO_TCP, TCP_NODELAY, (void*)&enable, sizeof(enable));


    ret = mtcp_connect(mctx, sockid,
            addr, sizeof(struct sockaddr_in));
    if (ret < 0) {
        if (errno != EINPROGRESS) {
            perror("mtcp_connect");
            mtcp_close(mctx, sockid);
            return -1;
        }
    }

    ctx->started++;
    ctx->pending++;
    ctx->stat.connects++;

    ev.events = MTCP_EPOLLOUT;
    ev.data.sockid = sockid;
    mtcp_epoll_ctl(mctx, ctx->ep, MTCP_EPOLL_CTL_ADD, sockid, &ev);

    return sockid;
}

inline void
CloseConnection(thread_context_t ctx, int sockid)
{
    mtcp_epoll_ctl(ctx->mctx, ctx->ep, MTCP_EPOLL_CTL_DEL, sockid, NULL);
    mtcp_close(ctx->mctx, sockid);

    printf("CloseConnection: %d\n", sockid);
    ctx->pending--;
    ctx->done++;
    assert(ctx->pending >= 0);

    if(ctx->done ==  ctx->stat.connects)
        done[ctx->core] = TRUE;
}




int
pm_epoll_ctl(thread_context_t ctx, int epid,
        int op, int sockid, struct mtcp_epoll_event *event)
{
    int realfd = ctx->wvars[sockid].fd;
    return  mtcp_epoll_ctl(ctx->mctx, epid, op, realfd, event);
}


int
pm_epoll_wait(thread_context_t ctx, int epid,
        struct mtcp_epoll_event *events, int maxevents, int timeout)
{
    int evnum, tmpi;
    struct timeval nowtime;
    evnum = mtcp_epoll_wait(ctx->mctx, epid, events, maxevents, timeout);
    gettimeofday(&nowtime, NULL); // time here
    for (tmpi = 0; tmpi < evnum; tmpi++) {
        events[tmpi].data.sockid = ctx->realfd_to_fakefd[events[tmpi].data.sockid];
        ctx->wvars[events[tmpi].data.sockid].active = 1; // mark this fd as actived
    }
    // process for all fds
    struct mtcp_epoll_event tmp_event;
    struct wget_vars *tmpfakefd;
    int tmprealfd;
    tmp_event.events = EPOLLOUT | EPOLLET;
    for (tmpi = 0; tmpi <= ctx->biggest_fd; tmpi++) {
        if (ctx->wvars[tmpi].use) { // fd is on use
            if(per_core_latency[ctx->core] > LATENCY_LINE && !ctx->wvars[tmpi].tmpindex)   //The latency is more than critical value;
            {

                struct sockaddr_in addr;

                addr.sin_family = AF_INET;
                addr.sin_addr.s_addr = inet_addr(SERV_IP);;
                addr.sin_port = htons(SERV_PORT);


                int newsockfd = pm_createConnection(ctx,  (struct sockaddr *)&addr,  sizeof(struct sockaddr_in));
                if (newsockfd < 0) {
                   printf("connect server failed, failed to create new connections...\n");
                }
                ctx->wvars[tmpi].tmpfd = newsockfd;
                int newfakefd = ctx->realfd_to_fakefd[newsockfd];
                ctx->wvars[newfakefd].tmpindex = 1;
                ctx->wvars[tmpi].tmpindex = 1;
            }
            if (ctx->wvars[tmpi].active) // fd is actived
                ctx->wvars[tmpi].active = 0;
            else if( ctx->wvars[tmpi].tmpfd){ // not activated fds
                tmpfakefd = &ctx->wvars[tmpi];

                //close the former connection and reflesh the match
                if(tmpfakefd->read_times >= tmpfakefd->write_times)
                {
                    tmpfakefd->use = 0;
                    CloseConnection(ctx, tmpfakefd->fd);
                }
                else //resend
                {
                    if (tmpfakefd->pretime.tv_sec == 0) { // not set time
                        gettimeofday(&tmpfakefd->pretime, NULL);
                        printf("Here init fakefd\'s time\n");
                    }
                    else if (nowtime.tv_sec - tmpfakefd->pretime.tv_sec >= 1) {
                        int realfd = ctx->wvars[tmpi].tmpfd;
                        ctx->wvars[realfd].resend = 1;
                        strcpy(ctx->wvars[realfd].kv_buf, tmpfakefd->kv_buf);
                        tmpfakefd->use = 0;
                        CloseConnection(ctx, tmpfakefd->fd);
                    }

                    tmpfakefd->pretime = nowtime;
                }
            } // end elseif fakefd.active
        } // end if fakefd.use
    } // end for-loop
    return evnum;
}


ssize_t
pm_write(thread_context_t ctx, int sockid, char *buf, size_t len)
{
    int realfd = ctx->wvars[sockid].fd;
    return mtcp_write(ctx->mctx, realfd, buf, len);
}

ssize_t pm_read(thread_context_t ctx, int sockid, char *buf, size_t len)
{
    int realfd = ctx->wvars[sockid].fd;
    return mtcp_read(ctx->mctx, realfd, buf, len);
}



int pm_get_real_fd(thread_context_t ctx, int fd) {
    int realfd = ctx->wvars[fd].fd;
    return realfd;
}






/*----------------------------------------------------------------------------*/
